home *** CD-ROM | disk | FTP | other *** search
/ Cream of the Crop 26 / Cream of the Crop 26.iso / os2 / pvm34b3.zip / pvm34b3 / pvm3 / src / lpvmmimd.c < prev    next >
C/C++ Source or Header  |  1997-07-22  |  50KB  |  2,013 lines

  1.  
  2. static char rcsid[] =
  3.     "$Id: lpvmmimd.c,v 1.16 1997/06/27 17:32:27 pvmsrc Exp $";
  4.  
  5. /*
  6.  *         PVM version 3.4:  Parallel Virtual Machine System
  7.  *               University of Tennessee, Knoxville TN.
  8.  *           Oak Ridge National Laboratory, Oak Ridge TN.
  9.  *                   Emory University, Atlanta GA.
  10.  *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
  11.  *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
  12.  *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
  13.  *                   (C) 1997 All Rights Reserved
  14.  *
  15.  *                              NOTICE
  16.  *
  17.  * Permission to use, copy, modify, and distribute this software and
  18.  * its documentation for any purpose and without fee is hereby granted
  19.  * provided that the above copyright notice appear in all copies and
  20.  * that both the copyright notice and this permission notice appear in
  21.  * supporting documentation.
  22.  *
  23.  * Neither the Institutions (Emory University, Oak Ridge National
  24.  * Laboratory, and University of Tennessee) nor the Authors make any
  25.  * representations about the suitability of this software for any
  26.  * purpose.  This software is provided ``as is'' without express or
  27.  * implied warranty.
  28.  *
  29.  * PVM version 3 was funded in part by the U.S. Department of Energy,
  30.  * the National Science Foundation and the State of Tennessee.
  31.  */
  32.  
  33. /*
  34.  *    lpvmmimd.c
  35.  *
  36.  *    Libpvm core for MPP environment.
  37.  *
  38. $Log: lpvmmimd.c,v $
  39.  * Revision 1.16  1997/06/27  17:32:27  pvmsrc
  40.  * Updated for WIN32 header files & Authors.
  41.  *
  42.  * Revision 1.15  1997/06/12  20:10:44  pvmsrc
  43.  * Made sure all communications for TC_* task control messages
  44.  *     use the SYSCTX_TC system context.
  45.  *     - some messages being sent in default context...  D-Oh...
  46.  *
  47.  * Revision 1.14  1997/05/29  15:13:35  pvmsrc
  48.  * Removed static decls:
  49.  *     - pvmtrcsbf, pvmrouteopt now in lpvmglob.c / lpvm.h.
  50.  *     - pvmtrcmid doesn't exist.
  51.  *
  52.  * Revision 1.13  1997/04/30  21:26:05  pvmsrc
  53.  * SGI Compiler Warning Cleanup.
  54.  *
  55.  * Revision 1.12  1997/04/07  21:09:12  pvmsrc
  56.  * pvm_addmhf() - new paramter interface
  57.  *
  58.  * Revision 1.11  1997/04/01  21:28:16  pvmsrc
  59.  * Damn Damn Damn.
  60.  *     - pvm_recvinfo() returns a bufid, not an index.  Damn.
  61.  *
  62.  * Revision 1.10  1997/04/01  20:48:19  pvmsrc
  63.  * Fixed tracer mbox usage:
  64.  *     - pvm_getinfo() -> pvm_recvinfo(), new semantics handled (recvinfo
  65.  *         sets rbuf implicitly, a la pvm_recv, need to save rbuf).
  66.  *
  67.  * Revision 1.9  1997/03/27  19:55:29  pvmsrc
  68.  * Fixed up pvmbeatask() to go get tracer info if spawned from shell:
  69.  *     - env var info including trace mask, trace buffer size, trace opts.
  70.  *     - use PVMTRACERCLASS mbox entry to fill in values, if matches
  71.  *         on trctid, trcctx, and trctag.
  72.  *
  73.  * Revision 1.8  1997/03/06  21:50:19  pvmsrc
  74.  * Yanked out #includes for <netinet/in.h> and <netinet/tcp.h>.
  75.  *     - dups with lpvm.h #includes...
  76.  *
  77.  * Revision 1.7  1997/01/28  19:26:29  pvmsrc
  78.  * New Copyright Notice & Authors.
  79.  *
  80.  * Revision 1.6  1996/12/19  20:17:04  pvmsrc
  81.  * Replaced old struct umbuf with new struct pmsg.
  82.  *
  83.  * Revision 1.5  1996/12/19  19:57:56  pvmsrc
  84.  * Eradicated remainder of old control message interface.
  85.  *     - replaced pvmmctl() routine with individual messages handlers:
  86.  *         * pvm_tc_shmat() (lpvmshmem.c only).
  87.  *         * pvm_tc_conreq().
  88.  *         * pvm_tc_conack().
  89.  *         * pvm_tc_taskexit().
  90.  *     - added appropriate calls to pvm_addmhf() in pvmbeatask().
  91.  *     - removed calls to pvmmctl() in mroute() (peer_recv() in
  92.  *         lpvmshmem.c), replaced with new mesg_input() call,
  93.  *         use new pmsg_setenc() routine to set message encoding.
  94.  *
  95.  * Revision 1.4  1996/12/18  22:27:48  pvmsrc
  96.  * Extracted duplicate versions of routines from lpvm/mimd/shmem.c,
  97.  *     inserted into shared lpvmgen.c:
  98.  *     - pvmbailout().
  99.  *     - pvmlogerror().
  100.  *     - vpvmlogprintf(), pvmlogprintf().  (hope these work on MPP & shmem)
  101.  *     - pvmlogperror().
  102.  *
  103.  * Revision 1.3  1996/10/25  13:57:27  pvmsrc
  104.  * Replaced old #includes for protocol headers:
  105.  *     - <pvmsdpro.h>, "ddpro.h", "tdpro.h"
  106.  * With #include of new combined header:
  107.  *     - <pvmproto.h>
  108.  *
  109.  * Revision 1.2  1996/10/24  22:44:33  pvmsrc
  110.  * Modified for New Tracing Facility:
  111.  *     - moved #include "global.h" below other #include's for typing.
  112.  *     - removed extra #include <pvm3.h> in lpvm.c...
  113.  *     - added #include of new "lpvm.h" to replace explicit externs.
  114.  *     - removed common control message handlers from lpvm.c:
  115.  *         * extracted to lpvmgen.c for general usage.
  116.  *         * pvm_tc_noop(), pvm_tc_settmask(), pvm_tc_siblings().
  117.  *         -> lpvmmimd.c & lpvmshmem.c still need remainder of pvmmctl()
  118.  *             replaced with control message handlers.
  119.  *     - arg typing hassles with int_compare() / qsort() exacerbated...
  120.  *     - modified pvmbeatask():
  121.  *         * handle new tracing info, unpack tracing and output collection
  122.  *             parameters into temp storage, and then check for local task
  123.  *             override before applying.
  124.  *         * read in new tracing env vars PVMTRCBUF & PVMTRCOPT.
  125.  *         * install new common message handlers.
  126.  *         * call new tev_init() routine to set up tracing stuff.
  127.  *         * use new Pvmtracer structures (pvmtrc & pvmctrc) to store info.
  128.  *     - removed pvm_getopt() & pvm_setopt() -> moved to common lpvmgen.c.
  129.  *     - removed old tev_begin(), tev_fin() & tev_do_trace() routines.
  130.  *     - updated trace event generation for pvm_getfds(), pvm_start_pvmd(),
  131.  *         pvm_precv(), pvm_psend().
  132.  *
  133.  * Revision 1.1  1996/09/23  23:44:17  pvmsrc
  134.  * Initial revision
  135.  *
  136.  * Revision 1.20  1995/11/02  16:12:23  manchek
  137.  * free replies to control messages in mxfer
  138.  *
  139.  * Revision 1.19  1995/11/02  16:11:15  manchek
  140.  * removed hdump()
  141.  *
  142.  * Revision 1.18  1995/09/06  17:37:25  manchek
  143.  * aargh, forgot pvm_precv
  144.  *
  145.  * Revision 1.17  1995/09/06  17:32:39  manchek
  146.  * pvm_psend returns not implemented instead of bad param for string type
  147.  *
  148.  * Revision 1.16  1995/09/05  19:16:11  manchek
  149.  * changed some comments
  150.  *
  151.  * Revision 1.15  1995/07/28  16:40:58  manchek
  152.  * wrap HASERRORVARS around errno declarations
  153.  *
  154.  * Revision 1.14  1995/07/18  19:03:35  manchek
  155.  * added code to generate and check crc on each message (MCHECKSUM)
  156.  *
  157.  * Revision 1.13  1995/06/28  18:19:09  manchek
  158.  * do-nothing check_for_exit so one can be in lpvmshmem.c
  159.  *
  160.  * Revision 1.12  1995/06/19  17:49:22  manchek
  161.  * was packing random string in TC_CONACK message in pvmmctl
  162.  *
  163.  * Revision 1.11  1995/06/16  16:07:54  manchek
  164.  * set debug mask and trace mask from environment vars.
  165.  * hack to set trace and output sink and tag for PGON
  166.  *
  167.  * Revision 1.10  1995/06/12  15:58:52  manchek
  168.  * added PGON partition size support
  169.  *
  170.  * Revision 1.9  1995/05/30  17:29:52  manchek
  171.  * Added ifdefs for SP2MPI architecture.
  172.  * Fixed bug in pvm_precv.
  173.  * Use asyncsend and probe for incoming messages in pvm_psend.
  174.  * Prefix my_node, etc. with "pvm" and make them static to avoid name clashes.
  175.  * Fix in mroute() to handle null message
  176.  *
  177.  * Revision 1.8  1995/02/01  21:11:45  manchek
  178.  * error 4 is now PvmOverflow
  179.  *
  180.  * Revision 1.7  1994/12/20  16:39:05  manchek
  181.  * added pvmshowtaskid variable
  182.  *
  183.  * Revision 1.6  1994/11/07  22:39:59  manchek
  184.  * Modify node_mcast() to handle DataInPlace correctly.
  185.  * Modify pvm_precv() to deal with packets from service nodes.  Those
  186.  * packets have PVM headers.
  187.  * Change the limit on pvmfrgsiz in pvm_setopt() to MAXFRAGSIZE.
  188.  * Respond to RouteDirect requests
  189.  *
  190.  * Revision 1.5  1994/09/02  15:25:03  manchek
  191.  * fixed typos in pvm_precv - rtid should be rlen
  192.  *
  193.  * Revision 1.4  1994/06/03  20:38:17  manchek
  194.  * version 3.3.0
  195.  *
  196.  * Revision 1.3  1993/12/20  15:39:15  manchek
  197.  * patch 6 from wcj
  198.  *
  199.  * Revision 1.1  1993/08/30  23:26:48  manchek
  200.  * Initial revision
  201.  *
  202.  */
  203.  
  204. #include <stdio.h>
  205. #include <rpc/types.h>
  206. #include <rpc/xdr.h>
  207. #include <sys/stat.h>
  208. #ifdef IMA_CM5
  209. #include <unistd.h>
  210. #endif
  211. #include <fcntl.h>
  212. #ifdef    SYSVSTR
  213. #include <string.h>
  214. #else
  215. #include <strings.h>
  216. #endif
  217. #include <errno.h>
  218. #ifdef IMA_PGON
  219. #include <nx.h>
  220. #endif
  221. #ifdef IMA_I860
  222. #include <cube.h>
  223. #endif
  224. #ifdef IMA_SP2MPI
  225. #include <sys/socket.h>
  226. #include <sys/select.h>
  227. #include "mpi.h"
  228. #endif
  229. #include <pvm3.h>
  230. #include <pvmproto.h>
  231. #include "pvmalloc.h"
  232. #include "pvmfrag.h"
  233. #include "pmsg.h"
  234. #include "listmac.h"
  235. #include "pvmdmp.h"
  236. #include "pvmmimd.h"
  237. #include "bfunc.h"
  238. #include "lpvm.h"
  239. #include <pvmtev.h>
  240. #include "tevmac.h"
  241. #include "global.h"
  242.  
  243. #ifndef max
  244. #define max(a,b)    ((a)>(b)?(a):(b))
  245. #endif
  246.  
  247. char *getenv();
  248. void hex_inadport __ProtoGlarp__ (( char *, struct sockaddr_in * ));
  249.  
  250. extern struct encvec *enctovec();
  251.  
  252.  
  253. /***************
  254.  **  Globals  **
  255.  **           **
  256.  ***************/
  257.  
  258. #ifndef HASERRORVARS
  259. extern int errno;                    /* from libc */
  260. extern char *sys_errlist[];
  261. extern int sys_nerr;
  262. #endif
  263.  
  264. int pvmtidhmask = TIDHOST;            /* mask - host field of tids */
  265. int pvmtidnmask = TIDNODE;            /* mask - node field of tids */
  266. int pvmtidtmask = TIDPTYPE;            /* mask - proc type field of tids */
  267.  
  268. int pvmpgonpartsize = 0;            /* pgon parent partition size */
  269.  
  270.  
  271. /***************
  272.  **  Private  **
  273.  **           **
  274.  ***************/
  275.  
  276. static int pvmdebmask = 0;                /* which debugging info */
  277. static char pvmtxt[512];                /* scratch for error log */
  278. static struct pmsg *rxfrag = 0;            /* not-assembled incm msgs */
  279. static int mpierrcode = 0;                /* error code returned by MPI calls */
  280. static struct tmpfrag outfrags[NUMSMHD];/* fragments queued by async send */
  281. static int nextsmhd = 0;                /* index of current isend mhdl */
  282. #if defined(IMA_CM5) || defined(IMA_SP2MPI)
  283. static struct pmsg *precvlist = 0;        /* not-processed incm msgs */
  284. #endif
  285. static long pvmmyptype = 0;                /* my process type */
  286. static long pvmhostnode = -1;                /* host node number */
  287. static int pvmmynode = -1;                /* my node number */
  288. #ifdef IMA_I860
  289. static int pvmmyvnode = -1;                /* my virtual node number */
  290. #endif
  291. static int mysetpart = 0;                /* host field & set field */
  292.  
  293. /* static int recvmask = 0x80000000 + (1<<PMTDN) + (1<<PMTNN) + (1<<PMTMCAST);*/
  294.  
  295.  
  296. /**************************
  297.  **  Internal Functions  **
  298.  **                      **
  299.  **************************/
  300.  
  301.  
  302. /*    pvm_tc_conreq()
  303. *
  304. *    Another task requests a connection with us.
  305. *    Reply with a TC_CONACK message.
  306. *
  307. *    TC_CONREQ() {
  308. *        int tdprotocol        // t-d protocol revision number
  309. *        string sockaddr        // address of other socket
  310. *    }
  311. */
  312.  
  313. static int
  314. pvm_tc_conreq(mid)
  315.     int mid;
  316. {
  317.     int src;                /* sender of request */
  318.     int rbf;                /* temp rx message storage */
  319.     int sbf = 0;            /* return message to send */
  320.     int ttpro;                /* protocol revision */
  321.     int ackd;                /* allow connection if 0 */
  322.     int i;
  323.     int ictx;
  324.  
  325.     rbf = pvm_setrbuf(mid);
  326.     pvm_bufinfo(mid, (int *)0, (int *)0, &src);
  327.  
  328.     pvmlogprintf("pvm_tc_conreq() TCP conn request from t%x!\n", src);
  329.  
  330.     sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
  331.     ttpro = TDPROTOCOL;
  332.     ackd = 1;
  333.     pvm_pkint(&ttpro, 1, 1);
  334.     pvm_pkint(&ackd, 1, 1);
  335.     pvm_pkstr("");
  336. i = pvmrescode;
  337. pvmrescode = 1;
  338.     ictx = pvm_setcontext(SYSCTX_TC);
  339.     pvm_send(src, TC_CONACK);
  340.     pvm_setcontext(ictx);
  341. pvmrescode = i;
  342.     pvm_freebuf(pvm_setsbuf(sbf));
  343.  
  344.     pvm_setrbuf(rbf);
  345.     pvm_freebuf(mid);
  346.     return 0;
  347. }
  348.  
  349.  
  350. /*    pvm_tc_conack()
  351. *
  352. *    Another task replies to our connection request.
  353. *
  354. *    TC_CONACK() {
  355. *        int tdprotocol        // t-d protocol revision number
  356. *        int ack                // 0 ok, 1 denied
  357. *        string sockaddr        // address of other socket
  358. *    }
  359. */
  360.  
  361. static int
  362. pvm_tc_conack(mid)
  363.     int mid;
  364. {
  365.     int src;                /* sender of reply */
  366.     int rbf;                /* temp rx message storage */
  367.  
  368.     rbf = pvm_setrbuf(mid);
  369.     pvm_bufinfo(mid, (int *)0, (int *)0, &src);
  370.  
  371.     pvmlogprintf("pvm_tc_conack() unexpected TC msg from t%x!\n", src);
  372.  
  373.     pvm_setrbuf(rbf);
  374.     pvm_freebuf(mid);
  375.     return 0;
  376. }
  377.  
  378.  
  379. /*    pvm_tc_taskexit()
  380. *
  381. *    We are notified that another task (to which we have a direct route)
  382. *    has exited.
  383. */
  384.  
  385. static int
  386. pvm_tc_taskexit(mid)
  387.     int mid;
  388. {
  389.     int src;                /* sender of notify */
  390.     int rbf;                /* temp rx message storage */
  391.  
  392.     rbf = pvm_setrbuf(mid);
  393.     pvm_bufinfo(mid, (int *)0, (int *)0, &src);
  394.  
  395.     pvmlogprintf("pvm_tc_taskexit() unexpected TC msg from t%x!\n",
  396.             src);
  397.  
  398.     pvm_setrbuf(rbf);
  399.     pvm_freebuf(mid);
  400.     return 0;
  401. }
  402.  
  403.  
  404. /*    mroute()
  405. *
  406. *   Route a message to a destination.
  407. *   Returns when
  408. *       outgoing message (if any) fully sent and
  409. *       (timed out (tmout) or
  410. *           at least one message fully received)
  411. *   Returns >=0 the number of complete messages downloaded, or
  412. *   negative on error.
  413.  
  414. */
  415.  
  416. int
  417. mroute(mid, dtid, code, tmout)
  418.     int mid;                /* message */
  419.     int dtid;                /* dest */
  420.     int code;                /* type code */
  421.     struct timeval *tmout;    /* get at least one message */
  422. {
  423.     struct pmsg *txup;            /* tx message or null */
  424.     struct frag *txfp = 0;        /* cur tx frag or null */
  425.     int gotem = 0;                /* count complete msgs downloaded */
  426.     struct pmsg *rxup;            /* rx message */
  427.     struct frag *fp;
  428.     char *cp = 0;
  429.     int src;
  430.     int dst;
  431.     int ff;
  432.     int block;                    /* get at least one message */
  433.     struct timeval tnow, tstop;
  434.     int len;
  435.     long node;
  436. #ifdef IMA_SP2MPI
  437.     MPI_Status info;
  438. #else
  439.     long info[8];                /* info about pending message */
  440. #endif
  441. #ifdef IMA_SP2MPI
  442.     int mpiflag = 0;
  443.     int mpisiz;
  444. #endif
  445.     
  446.  
  447.     /* XXX do we really have to do this? */
  448.     if ((dtid == TIDPVMD && code == TM_MCA) || dtid == TIDGID)
  449.         return node_mcast(mid, dtid, code);
  450.  
  451.     if (tmout) {
  452.         if (tmout->tv_sec || tmout->tv_usec) {
  453.             pvmgetclock(&tnow);
  454.             tstop.tv_sec = tnow.tv_sec + tmout->tv_sec;
  455.             tstop.tv_usec = tnow.tv_usec + tmout->tv_usec;
  456.             block = 1;
  457.         } else
  458.             block = 0;
  459.     } else {
  460.         block = 1;
  461.         tstop.tv_sec = -1;
  462.         tstop.tv_usec = -1;
  463.     }
  464.  
  465.     if (txup = midtobuf(mid)) {
  466.         txfp = txup->m_frag->fr_link;
  467.         if (!txfp->fr_buf) {
  468.             txfp = fr_new(MAXHDR);
  469.             txfp->fr_dat += MAXHDR;
  470.             LISTPUTBEFORE(txup->m_frag, txfp, fr_link, fr_rlink);
  471.         }
  472.         node_send(txup, txfp, dtid, code);
  473.         if (!block)
  474.             return 0;
  475.     }
  476.  
  477.     do {
  478.         if (block && tstop.tv_sec != -1) {
  479.             pvmgetclock(&tnow);
  480.             if (tnow.tv_sec > tstop.tv_sec 
  481.             || (tnow.tv_sec == tstop.tv_sec && tnow.tv_usec >= tstop.tv_usec)) 
  482.                 break;
  483.         }
  484.  
  485.         /* 
  486.          * Receive PMTPACK msg only; psent msg can have any (user-supplied)
  487.          * type not equal to PMTPACK. They're handled by precv.
  488.          */
  489.         node = MPPANYNODE;
  490.         if (MSGPROBE(node, PMTPACK, info)) {
  491.             /* 
  492.              * 1. Null msg (except I860) from pvmd, followed by the real msg.
  493.              * 2. Header (inplace), followed by msg body.
  494.              * 3. Entire msg from another node.
  495.              */
  496.             if (len = MSGSIZE(info)) {
  497.                 fp = fr_new(len);
  498.                 cp = fp->fr_dat;
  499.             } 
  500.             if ((mpierrcode = PVMCRECV(node, PMTPACK, cp, len, 0, info))
  501. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
  502.             && (mpierrcode < 0)
  503. #endif
  504.             ) {
  505.                 pvmlogperror("mroute() PVMCRECV");
  506.                 return PvmSysErr;
  507.             }
  508.             if (!len) {
  509.                 /*
  510.                  * Discard null msg. Now we must wait for the real (packed)
  511.                  * msg, otherwise it could get picked up later by precv and
  512.                  * cause problems.
  513.                  */
  514.                 block = 1;
  515.                 continue;
  516.             }
  517.             if ((len = pvmget32(cp + 8) + TDFRAGHDR) != fp->fr_max) {
  518.                 struct frag *hdr;        /* buffer for header */
  519.  
  520.                 /* inplace data, head & body sent separately */
  521.                 hdr = fp;
  522.                 fp = fr_new(len);
  523.                 len = hdr->fr_max;
  524.                 BCOPY(hdr->fr_dat, fp->fr_dat, len);
  525.                 fr_unref(hdr);
  526.                 node = MSGSENDER(info);
  527. #ifdef IMA_I860
  528.                 /* msg type is (virtual) node # */
  529.                 if (_crecv(node, fp->fr_dat + len, fp->fr_max - len) < 0) {
  530. #else
  531.                 if ((mpierrcode = PVMCRECV(node, PMTPACK, fp->fr_dat + len, 
  532.                 fp->fr_max - len, 0, info))
  533. #if defined(IMA_PGON) || defined(IMA_CM5)
  534.                 && (mpierrcode < 0)
  535. #endif
  536.                 ) {
  537. #endif
  538.                     pvmlogperror("mroute() PVMCRECV msg body");
  539.                     return PvmSysErr;
  540.                 }
  541.                 cp = fp->fr_dat;
  542.             } 
  543.  
  544.             /* fp->fr_len = fp->fr_max; */
  545.             dst = pvmget32(cp);
  546.             src = pvmget32(cp + 4);
  547.             fp->fr_len = pvmget32(cp + 8);
  548.             ff = pvmget8(cp + 12);
  549.             /* fp->fr_len -= TDFRAGHDR; */
  550.             fp->fr_dat += TDFRAGHDR;
  551.             if (pvmdebmask & PDMPACKET) {
  552.                 sprintf(pvmtxt, "mroute() src t%x len %d dst t%x\n",
  553.                     src, fp->fr_len, dst);
  554.                 pvmlogerror(pvmtxt);
  555.             }
  556.     /*
  557.     * if start of message, make new umbuf, add to frag pile
  558.     */
  559.             if (ff & FFSOM) {
  560.                 cp += TDFRAGHDR;
  561.                 fp->fr_len -= TTMSGHDR;
  562.                 fp->fr_dat += TTMSGHDR;
  563.                 rxup = umbuf_new();
  564.                 rxup->m_tag = pvmget32(cp);
  565.                 rxup->m_enc = pvmget32(cp + 4);
  566.                 rxup->m_wid = pvmget32(cp + 8);
  567.                 rxup->m_crc = pvmget32(cp + 12);
  568.                 rxup->m_src = src;
  569.                 LISTPUTBEFORE(rxfrag, rxup, m_link, m_rlink);
  570.             }
  571.  
  572.     /* locate frag's message */
  573.  
  574.             for (rxup = rxfrag->m_link; rxup != rxfrag; rxup = rxup->m_link)
  575.                 if (rxup->m_src == src)
  576.                     break;
  577.  
  578.             if (rxup == rxfrag) {    /* uh oh, no message for it */
  579.                 pvmlogerror("mroute() frag with no message\n");
  580.                 fr_unref(fp);
  581.  
  582.             } else {
  583.                 LISTPUTBEFORE(rxup->m_frag, fp, fr_link, fr_rlink);
  584.                 rxup->m_len += fp->fr_len;
  585.     /*
  586.     * if end of message, move to rxlist and count it
  587.     */
  588.                 if (ff & FFEOM) {
  589.                     LISTDELETE(rxup, m_link, m_rlink);
  590. #ifdef    MCHECKSUM
  591.                     if (rxup->m_crc != umbuf_crc(rxup)) {
  592.                         sprintf(pvmtxt,
  593.                         "mxinput() message src t%x cod %d bad checksum\n",
  594.                                 rxup->m_src, rxup->m_tag);
  595.                         pvmlogerror(pvmtxt);
  596.                         umbuf_free(rxup);
  597.  
  598.                     } else {
  599. #endif
  600.                         pmsg_setenc(rxup, rxup->m_enc);
  601.                         mesg_input(rxup);
  602.                         gotem++;
  603. #ifdef    MCHECKSUM
  604.                     }
  605. #endif
  606.                 }
  607.             }
  608.         }
  609.  
  610.     } while (block && !gotem);
  611.  
  612.     return gotem;
  613. }
  614.  
  615.  
  616. /* sends a frag to another process */
  617. int
  618. node_send(txup, txfp, dtid, code)
  619.     struct pmsg *txup;        /* tx message or null */
  620.     struct frag *txfp;        /* cur tx frag or null */
  621.     int dtid;                /* dest */
  622.     int code;                /* type code */
  623. {
  624.     int mask = pvmtidhmask;        /* host */
  625.     char *txcp = 0;                /* point to remainder of txfp */
  626.     int txtogo = 0;                /* len of txfp */
  627.     long node;                    /* destination node */
  628.     long ptype;                    /* destination process type */
  629.     int i;
  630.     int ff;
  631.     char dummy[TDFRAGHDR+TTMSGHDR];    /* for inplace data */
  632. #ifdef IMA_SP2MPI
  633.     MPI_Status mpista;
  634.     int mpiflag = 0;
  635. #endif
  636.  
  637. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)
  638.     mask |= pvmtidtmask;                /* process type */
  639. #endif
  640.  
  641.     if (TIDISNODE(dtid) && (dtid & mask) == (pvmmytid & mask)) {
  642.         node = dtid & pvmtidnmask;
  643.         ptype = pvmmyptype;            /* send to node directly */
  644.     } else {
  645.         node = pvmhostnode;
  646.         ptype = PVMDPTYPE;            /* send to pvmd first */
  647.     }
  648.  
  649.     do {
  650.  
  651.     /* check any pending sends; free data if send is completed */
  652.  
  653.         if (nextsmhd == NUMSMHD)
  654.             nextsmhd = 0;
  655.         i = nextsmhd;
  656.  
  657.         while (outfrags[i].tf_mhdl != PVMMHDNULL &&
  658.         ASYNCDONE(outfrags[i].tf_mhdl) > 0) {
  659.             fr_unref(outfrags[i].tf_fp);
  660. #ifdef IMA_CM5
  661.             CMMD_free_mcb(outfrags[i].tf_mhdl);
  662. #endif
  663.             outfrags[i++].tf_mhdl = PVMMHDNULL;
  664.         }
  665.  
  666.         if (outfrags[nextsmhd].tf_mhdl != PVMMHDNULL) {
  667.             if (pvmdebmask & PDMPACKET)
  668.                 pvmlogerror("out of mid's?");
  669.             nextsmhd++;        /* don't get stuck here; check out next mhd */
  670.             continue;
  671.         }
  672.  
  673.         if (txfp->fr_u.dab)         /* packed data */
  674.             txcp = txfp->fr_dat;
  675.         else                         /* inplace data */
  676.             txcp = dummy + sizeof(dummy);
  677.         txtogo = txfp->fr_len;
  678.  
  679.     /*
  680.     * if this is first frag, prepend t-t header
  681.     */
  682.         ff = 0;
  683.         if (txfp->fr_rlink == txup->m_frag) {
  684.             txcp -= TTMSGHDR;
  685.             txtogo += TTMSGHDR;
  686.             pvmput32(txcp, code);
  687.             pvmput32(txcp + 4, (txup->m_enc == 2 ? pvmmydsig : txup->m_enc));
  688.             pvmput32(txcp + 8, txup->m_wid);
  689. #ifdef    MCHECKSUM
  690.             pvmput32(txcp + 12, umbuf_crc(txup));
  691. #else
  692.             pvmput32(txcp + 12, 0);
  693. #endif
  694.             ff = FFSOM;
  695.         }
  696.         if (txfp->fr_link == txup->m_frag)
  697.             ff |= FFEOM;
  698.     /*
  699.     * prepend t-d header
  700.     */
  701.         txcp -= TDFRAGHDR;
  702.         pvmput32(txcp, dtid);
  703.         pvmput32(txcp + 4, pvmmytid);
  704.         pvmput32(txcp + 8, txtogo);
  705.         pvmput32(txcp + 12, 0);            /* to keep putrify happy */
  706.         pvmput8(txcp + 12, ff);
  707.         txtogo += TDFRAGHDR;
  708.         if (pvmdebmask & PDMPACKET) {
  709.             sprintf(pvmtxt, "node_send() dst t%x len %d ptype=%ld node=%ld\n",
  710.                 dtid, txfp->fr_len, ptype, node);
  711.             pvmlogerror(pvmtxt);
  712.         }
  713.  
  714.         if (!txfp->fr_u.dab) {           /* inplace data */
  715.             if ((mpierrcode = PVMCSEND(PMTPACK, txcp, txtogo- txfp->fr_len, 
  716.             node, ptype))
  717. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
  718.             && (mpierrcode < 0)
  719. #endif
  720.             ) {
  721.                 pvmlogperror("node_send() csend header");
  722.                 return PvmSysErr;
  723.             }
  724. #ifdef IMA_I860
  725.             /* crecv can't select msg with node #; it can only select type */
  726.             if (_csend(pvmmyvnode, txfp->fr_dat, txfp->fr_len, node, 0) < 0) {
  727.                 pvmlogperror("node_send() csend inplace data");
  728.                 return PvmSysErr;
  729.             }
  730. #else
  731.             /* here the type field is reserved for psend/precv */
  732.             if ((mpierrcode = 
  733.             PVMCSEND(PMTPACK, txfp->fr_dat, txfp->fr_len, node, ptype))
  734. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
  735.             && (mpierrcode < 0)
  736. #endif
  737.             ) {
  738.                 pvmlogperror("node_send() csend inplace data");
  739.                 return PvmSysErr;
  740.             }
  741. #endif
  742.         } else {
  743.  
  744.             if (node != pvmhostnode) {
  745. #ifdef IMA_SP2MPI
  746.                 if (mpierrcode = MPI_Isend(txcp, txtogo, MPI_BYTE, node, 
  747.                 PMTPACK, MPI_COMM_WORLD, &outfrags[nextsmhd].tf_mhdl)) {
  748. #else
  749.                 if ((outfrags[nextsmhd].tf_mhdl =
  750.                 ASYNCSEND(PMTPACK, txcp, txtogo, node, ptype)) < 0) {
  751. #endif
  752.                     pvmlogperror("node_send() asyncsend");
  753.                     return PvmSysErr;
  754.                 }
  755.                 /* don't free data 'til frag's sent */
  756.                 txfp->fr_u.ref++;        
  757.                 outfrags[nextsmhd++].tf_fp = txfp;
  758.             } else {
  759.                 /* 
  760.                   * In multicast, the fragment header may get overwritten
  761.                   * if we use ASYNCSEND. This is OK for node-node send,
  762.                   * because the dst field is not used. But pvmd relies on
  763.                   * this value to deliever the packet.
  764.                   */
  765.                 if ((mpierrcode = PVMCSEND(PMTPACK, txcp, txtogo, node, ptype)) 
  766. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_I860)
  767.                 && (mpierrcode < 0)
  768. #endif
  769.                 ) {
  770.                     pvmlogperror("node_send() csend");
  771.                     return PvmSysErr;
  772.                 }
  773.             }
  774.         }
  775.  
  776.         txfp = txfp->fr_link;
  777.         if (!txfp->fr_buf)
  778.             txfp = 0;
  779.  
  780.     } while (txfp);
  781. }
  782.  
  783.  
  784. int
  785. node_mcast(mid, dtid, code)
  786.     int mid;    /* message id */
  787.     int dtid;    /* destination */
  788.     int code;    /* type */
  789. {
  790.     int i;
  791.     long count = 0;
  792.     int cc = 0;
  793.     static int *tids;        /* intended recipients of multicast message */
  794.     static int ntask;        /* number of tids */
  795.     int sbf;
  796.     int tmp;
  797.     static struct timeval ztv = { 0, 0 };
  798. #if defined(IMA_PGON)
  799.     long *nodes;
  800.     int mask = pvmtidhmask;        /* host */
  801.     struct pmsg *txup;            /* tx message or null */
  802.     struct frag *txfp = 0;        /* cur tx frag or null */
  803.     int txtogo = 0;                /* len of txfp */
  804.     int ff;
  805.     char *txcp = 0;                /* point to remainder of txfp */
  806.     int len = 0;                /* len of txfp */
  807.     char dummy[TDFRAGHDR+TTMSGHDR];    /* for inplace data */
  808. #endif /*defined(IMA_PGON)*/
  809.  
  810.     /* intercept multicast info */
  811.  
  812.     if (dtid == TIDPVMD) {
  813.         pvm_setrbuf(mid);
  814.         pvm_upkint(&ntask, 1, 1);
  815.         tids = TALLOC(ntask, int, "tids");
  816.         pvm_upkint(tids, ntask, 1);
  817.         sbf = pvm_setsbuf(pvm_mkbuf(PvmDataFoo));
  818.         tmp = TIDGID;
  819.         pvm_pkint(&tmp, 1, 1);
  820.         pvm_setrbuf(pvm_setsbuf(sbf));
  821.         return 0;
  822.     }
  823.     
  824. #if defined(IMA_PGON)
  825.  
  826. /* #if defined(IMA_PGON) */
  827.     mask |= pvmtidtmask;        /* process type */
  828. /* #endif */
  829.     nodes = TALLOC(ntask, long, "nodes");
  830.     for (i = 0; i < ntask; i++) {
  831.         if (TIDISNODE(tids[i]) && (tids[i] & mask) == (pvmmytid & mask))
  832.             nodes[count++] = tids[i] & pvmtidnmask;
  833.         else
  834.             cc = mroute(mid, tids[i], code, &ztv);
  835.     }
  836.     if (count) {
  837.         if (txup = midtobuf(mid)) {
  838.             txfp = txup->m_frag->fr_link;
  839.             txfp = txfp->fr_buf ? txfp : 0;
  840.         }
  841.         while (txfp) {
  842.             if (txfp->fr_u.dab)         /* packed data */
  843.                 txcp = txfp->fr_dat;
  844.             else                         /* inplace data */
  845.                 txcp = dummy + sizeof(dummy);
  846.             txtogo = txfp->fr_len;
  847.  
  848.             ff = 0;
  849.             if (txfp->fr_rlink == txup->m_frag) {
  850.                 txcp -= TTMSGHDR;
  851.                 txtogo += TTMSGHDR;
  852.                 pvmput32(txcp, code);
  853.                 pvmput32(txcp + 4, 
  854.                     (txup->m_enc == 2 ? pvmmydsig : txup->m_enc));
  855.                 pvmput32(txcp + 8, txup->m_wid);
  856. #ifdef    MCHECKSUM
  857.                 pvmput32(txcp + 12, umbuf_crc(txup));
  858. #else
  859.                 pvmput32(txcp + 12, 0);
  860. #endif
  861.                 ff = FFSOM;
  862.             }
  863.             if (txfp->fr_link == txup->m_frag)
  864.                 ff |= FFEOM;
  865.             txcp -= TDFRAGHDR;
  866.             pvmput32(txcp, dtid);
  867.             pvmput32(txcp + 4, pvmmytid);
  868.             pvmput32(txcp + 8, txtogo);
  869.             pvmput32(txcp + 12, 0);            /* to keep putrify happy */
  870.             pvmput8(txcp + 12, ff);
  871.             txtogo += TDFRAGHDR;
  872.             if (pvmdebmask & PDMPACKET) {
  873.                 sprintf(pvmtxt, "node_mcast() len %d\n", txfp->fr_len);
  874.                 pvmlogerror(pvmtxt);
  875.             }
  876.             if (count == numnodes() - 1) {
  877.                 if (!txfp->fr_u.dab) {           /* inplace data */
  878.                     if (_csend(PMTPACK, txcp, txtogo - txfp->fr_len, -1, 
  879.                     pvmmyptype) < 0) {
  880.                         pvmlogperror("node_mcast() csend header");
  881.                         return PvmSysErr;
  882.                     }
  883.                     if (_csend(PMTPACK, txfp->fr_dat, txfp->fr_len, -1, 
  884.                     pvmmyptype) < 0){
  885.                         pvmlogperror("node_mcast() csend inplace data");
  886.                         return PvmSysErr;
  887.                     }
  888.                 } else {                        /* packed */
  889.                     if (_csend(PMTPACK, txcp, (long)txtogo, -1, pvmmyptype) < 0) {
  890.                     pvmlogperror("node_mcast() csend");
  891.                     return PvmSysErr;
  892.                 }
  893.                 }
  894.             
  895.             } else {
  896.                 if (!txfp->fr_u.dab) {           /* inplace data */
  897.                     if (_gsendx(PMTPACK, txcp, txtogo - txfp->fr_len, nodes, 
  898.                     count) < 0) {
  899.                         pvmlogperror("node_mcast() gsendx header");
  900.                         return PvmSysErr;
  901.                     }
  902.                     if (_gsendx(PMTPACK, txfp->fr_dat, txfp->fr_len, nodes, 
  903.                     count) < 0) {
  904.                         pvmlogperror("node_send() gsendx inplace data");
  905.                         return PvmSysErr;
  906.                     }
  907.                 } else {
  908.                     if (_gsendx(PMTPACK, txcp, (long)txtogo, nodes, count) < 0){
  909.                     pvmlogperror("node_mcast() gsendx");
  910.                     return PvmSysErr;
  911.                 }
  912.             }
  913.             }
  914.             txfp = txfp->fr_link;
  915.             if (!txfp->fr_buf)
  916.                 txfp = 0;
  917.         }
  918.     }
  919.     PVM_FREE(nodes);
  920. #endif /*defined(IMA_PGON)*/
  921.  
  922. #if defined(IMA_CM5) || defined(IMA_I860) || defined(IMA_SP2MPI)
  923.     for (i = 0; i < ntask; i++)
  924.         cc = mroute(mid, tids[i], code, &ztv);
  925. #endif
  926.  
  927.     PVM_FREE(tids);
  928.     ntask = 0;
  929.  
  930.     return cc;
  931. }
  932.     
  933.  
  934. /*    msendrecv()
  935. *
  936. *    Single op to send a system message (usually to our pvmd) and get
  937. *    the reply.
  938. *    Returns message handle or negative if error.
  939. */
  940.  
  941. int
  942. msendrecv(other, code)
  943.     int other;                /* dst, src tid */
  944.     int code;                /* message code */
  945. {
  946.     int cc;
  947.     struct pmsg *up;
  948.  
  949.     if (!pvmsbufmid)
  950.         return PvmNoBuf;
  951.  
  952.     /* send code to other */
  953.     if (pvmdebmask & PDMMESSAGE) {
  954.         sprintf(pvmtxt, "msendrecv() to t%x code %d\n", other, code);
  955.         pvmlogerror(pvmtxt);
  956.     }
  957.     if ((cc = mroute(pvmsbuf->m_mid, other, code, (struct timeval *)0)) < 0)
  958.         return cc;
  959.  
  960.     if (code == TM_MCA)        /* for node_mcast() */
  961.         return 1;
  962.  
  963.     /* recv code from other */
  964.     for (up = pvmrxlist->m_link; 1; up = up->m_link) {
  965.         if (up == pvmrxlist) {
  966.             up = up->m_rlink;
  967.             if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
  968.                 return cc;
  969.             up = up->m_link;
  970.         }
  971.  
  972.         if (pvmdebmask & PDMMESSAGE) {
  973.             sprintf(pvmtxt, "msendrecv() cmp from t%x code %d\n",
  974.                     up->m_src, up->m_tag);
  975.             pvmlogerror(pvmtxt);
  976.         }
  977.         if (up->m_src == other && up->m_tag == code)
  978.             break;
  979.     }
  980.     LISTDELETE(up, m_link, m_rlink);
  981.     if (pvmrbuf)
  982.         umbuf_free(pvmrbuf);
  983.     pvmrbuf = 0;
  984.     if (cc = pvm_setrbuf(up->m_mid))
  985.         return cc;
  986.     return up->m_mid;
  987. }
  988.  
  989.  
  990. #ifdef IMA_SP2MPI
  991.  
  992. /* Relay messages between pvmd and node tasks. */
  993. void relay(dsock)
  994.     int dsock;                    /* pvmd socket */
  995. {
  996.     fd_set wrk_rfds, wrk_wfds, rfds, wfds;
  997.     int nfds;
  998.     struct timeval tout;
  999.     struct frag *frpvmd = 0;    /* (small) frag from pvmd */
  1000.     struct frag *topvmd = 0;    /* (big) frag to pvmd */
  1001.     struct frag *frtask = 0;    /* (big) frag from task */
  1002.     struct frag *totask;        /* (small) frag being sent to task */
  1003.     char *txcp = 0;             /* point to remainder of topvmd */
  1004.     int txtogo = 0;             /* len of remainder of topvmd */
  1005.     int toread;                    /* number of bytes to be read from pvmd */
  1006.     int frtogo;                    /* len of remainder of a fragment */
  1007.     int topvmd_dst;                /* dst of fragment being sent to pvmd */ 
  1008.     int topvmd_src;                /* src of fragment being sent to pvmd */
  1009.     int len;
  1010.     int topvmd_ff;                /* ff of fragment being sent to pvmd */
  1011.     int    dst;                    /* dst of fragment being sent to node */
  1012.     int    node;                    /* node number */
  1013.     MPI_Request rmhd;            /* msg IDs returned by async recv */
  1014.     int n;
  1015.     char *cp;
  1016.     int err;
  1017.     MPI_Status sta;                /* info on pending message */
  1018.     int dummy;
  1019.     int flag;                    /* MPI_Test result */
  1020.     struct frag *hdr;
  1021.  
  1022.     nfds = dsock + 1;
  1023.  
  1024.     frtask = fr_new(MAXFRAGSIZE);
  1025.     if (err = MPI_Irecv(frtask->fr_dat, frtask->fr_max, MPI_BYTE, 
  1026.     MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &rmhd)) {
  1027.         MPI_Error_string(err, pvmtxt, &dummy);
  1028.         pvmlogerror(pvmtxt);
  1029.         pvmlogerror("relay() MPI_Irecv");
  1030.         pvm_exit();
  1031.         MPI_Finalize();
  1032.         exit(9);
  1033.     }
  1034.  
  1035.     FD_ZERO(&wrk_rfds);
  1036.     FD_ZERO(&wrk_wfds);
  1037.     FD_SET(dsock, &wrk_rfds);
  1038.  
  1039.     while (1) {            /* ferry messages between pvmd and tasks */
  1040.         
  1041.         tout.tv_sec = 0;
  1042.         tout.tv_usec = 0;
  1043.         if (!topvmd && !MPI_Test(&rmhd, &flag, &sta) && flag) {
  1044.             topvmd = frtask;
  1045.             MPI_Get_count(&sta, MPI_BYTE, &txtogo);        /* w/ header */
  1046. /*
  1047. sprintf(pvmtxt, "relay() recv %d", txtogo);
  1048. pvmlogerror(pvmtxt);
  1049. */
  1050.             txcp = topvmd->fr_dat;
  1051.             if ((len = pvmget32(txcp + 8) + TDFRAGHDR) != txtogo) {
  1052.                 /* inplace data, head & body sent separately */
  1053.                 if (len > MAXFRAGSIZE) {
  1054.                     hdr = topvmd;
  1055.                     topvmd = fr_new(len);
  1056.                     BCOPY(hdr->fr_dat, topvmd->fr_dat, txtogo);
  1057.                     fr_unref(hdr);
  1058.                     txcp = topvmd->fr_dat;
  1059.                 }
  1060.                 MPI_Recv(topvmd->fr_dat + txtogo, len - txtogo, MPI_BYTE,
  1061.                 sta.MPI_SOURCE, PMTPACK, MPI_COMM_WORLD, &sta);
  1062.                 txtogo = len;
  1063.             }
  1064.             frtask = fr_new(MAXFRAGSIZE);
  1065.             /* ready for the next message */
  1066.             if (err = MPI_Irecv(frtask->fr_dat, frtask->fr_max, MPI_BYTE, 
  1067.             MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &rmhd)) {
  1068.                 MPI_Error_string(err, pvmtxt, &dummy);
  1069.                 pvmlogerror(pvmtxt);
  1070.                 pvmlogerror("relay() MPI_Irecv");
  1071.                 pvm_exit();
  1072.                 MPI_Finalize();
  1073.                 exit(10);
  1074.             }
  1075.             FD_SET(dsock, &wrk_wfds);
  1076.         }
  1077.         rfds = wrk_rfds;
  1078.         wfds = wrk_wfds;
  1079.  
  1080.         if (select(nfds, &rfds, &wfds, (fd_set*)0, &tout) == -1 
  1081.         && errno != EINTR) {
  1082.             pvmlogperror("relay() select");
  1083.             pvm_exit();
  1084.             MPI_Finalize();
  1085.             exit(11);
  1086.         }
  1087.  
  1088.         if (FD_ISSET(dsock, &rfds)) {
  1089.  
  1090.             if (!frpvmd) {
  1091.                 frpvmd = fr_new(MAXFRAGSIZE);
  1092.                 toread = TDFRAGHDR;
  1093.             }
  1094.             n = read(dsock, frpvmd->fr_dat + frpvmd->fr_len, toread);
  1095. /*
  1096. sprintf(pvmtxt, "relay() read %d", n);
  1097. pvmlogerror(pvmtxt);
  1098. */
  1099.             if (n == -1 && errno != EWOULDBLOCK && errno != EINTR) {
  1100.                 pvmlogperror("relay() read pvmd sock");
  1101.                 MPI_Finalize();
  1102.                 exit(12);
  1103.             }
  1104.             if (!n) {
  1105.                 /* pvmlogerror("relay() read EOF on pvmd sock\n"); */
  1106.                 MPI_Finalize();
  1107.                 exit(13);
  1108.             }
  1109.  
  1110.             if ((frpvmd->fr_len += n) < TDFRAGHDR) {
  1111.                 toread -= n;
  1112.                 continue;
  1113.             }
  1114.             n = TDFRAGHDR + pvmget32(frpvmd->fr_dat + 8);   /* header + body */
  1115.             if (frpvmd->fr_len < n) {
  1116.                 if (frpvmd->fr_max < n) {                    /* n > MAXFRAGSIZ */
  1117.                     hdr = frpvmd;
  1118.                     frpvmd = fr_new(n);
  1119.                     BCOPY(hdr->fr_dat, frpvmd->fr_dat, hdr->fr_len);
  1120.                     frpvmd->fr_len = hdr->fr_len;
  1121.                     fr_unref(hdr);
  1122.                 }
  1123.                 toread = n - frpvmd->fr_len;
  1124.                 continue;
  1125.             }
  1126.  
  1127.             dst = pvmget32(frpvmd->fr_dat);
  1128.             node = dst & pvmtidnmask;
  1129.             /* inform precv/recv of a packed message */
  1130.             while (err = 
  1131.             MPI_Bsend(&node, 0, MPI_BYTE, node, PMTPACK, MPI_COMM_WORLD)) {
  1132.                 MPI_Error_string(err, pvmtxt, &dummy);
  1133.                 pvmlogerror(pvmtxt);
  1134.                 sprintf(pvmtxt, "relay() can't send null msg to t%x", dst);
  1135.                 pvmlogerror(pvmtxt);
  1136.             }
  1137.             while (err = MPI_Bsend(frpvmd->fr_dat, frpvmd->fr_len, MPI_BYTE, 
  1138.             node, PMTPACK, MPI_COMM_WORLD)) {
  1139.                 MPI_Error_string(err, pvmtxt, &dummy);
  1140.                 pvmlogerror(pvmtxt);
  1141.                 sprintf(pvmtxt, "relay() can't send to t%x", dst);
  1142.                 pvmlogerror(pvmtxt);
  1143.             }
  1144.             fr_unref(frpvmd);
  1145.             frpvmd = 0;
  1146. /*
  1147. sprintf(pvmtxt, "relay() sent %d to node %d\n", frpvmd->fr_len, (dst & pvmtidnmask));
  1148. pvmlogerror(pvmtxt);
  1149. */
  1150.                 
  1151.         }
  1152.  
  1153.         if (FD_ISSET(dsock, &wfds)) {
  1154.             n = write(dsock, txcp, txtogo);
  1155. /*
  1156. sprintf(pvmtxt, "relay() wrote %d to pvmd\n", n);
  1157. pvmlogerror(pvmtxt);
  1158. */
  1159.             if (n == -1 && errno != EWOULDBLOCK && errno != EINTR) {
  1160.                 pvmlogperror("relay() write pvmd sock");
  1161.                 MPI_Finalize();
  1162.                 exit(14);
  1163.             }
  1164.             if (n > 0 && (txtogo -= n) > 0) 
  1165.                 txcp += n;
  1166.             if (!txtogo) {        /* entire message sent */
  1167.                 FD_CLR(dsock, &wrk_wfds);
  1168.                 fr_unref(topvmd);
  1169.                 topvmd = 0;
  1170.             }
  1171.         }
  1172.     }
  1173. }
  1174.  
  1175.  
  1176. /* We're the "host" process. Connect to pvmd. */
  1177. void
  1178. pvmhost()
  1179. {
  1180.     char *p;
  1181.     int dsock;                    /* pvmd socket */
  1182.     struct sockaddr_in dsadr;    /* address of pvmd socket */
  1183.     int n;
  1184.     int pvminfo[SIZEHINFO];        /* ntask, hostpart, ptid, MTU, NDF */
  1185.     char nullmsg[TDFRAGHDR+TTMSGHDR];
  1186.  
  1187.     if (!(p = getenv("PVMSOCK"))) {
  1188.         pvmlogerror("pvmhost() getenv() pvmd socket\n");
  1189.         MPI_Finalize();
  1190.         exit(2);
  1191.     }
  1192.     mpierrcode = 0;
  1193.     if ((dsock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
  1194.         pvmlogperror("pvmhost() socket");
  1195.         MPI_Finalize();
  1196.         exit(3);
  1197.     }
  1198.     BZERO((char*)&dsadr, sizeof(dsadr));
  1199.     hex_inadport(p, &dsadr);
  1200.     dsadr.sin_family = AF_INET;
  1201.     n = sizeof(dsadr);
  1202.     while (connect(dsock, (struct sockaddr*)&dsadr, n) == -1)
  1203.         if (errno != EINTR) {
  1204.             pvmlogperror("pvmhost() connect");
  1205.             MPI_Finalize();
  1206.             exit(4);
  1207.         }
  1208. #ifndef NOSOCKOPT
  1209.     n = 1;
  1210.     if (setsockopt(dsock, IPPROTO_TCP, TCP_NODELAY, (char*)&n, sizeof(int))
  1211.     == -1) {
  1212.         pvmlogperror("pvmhost() setsockopt");
  1213.         MPI_Finalize();
  1214.         exit(5);
  1215.     }
  1216. #endif
  1217.     if (!(p = getenv("PVMEPID"))) {
  1218.         pvmlogerror("pvmhost() getenv() pid\n");
  1219.         MPI_Finalize();
  1220.         exit(6);
  1221.     }
  1222.     pvmmyupid = atoi(p);
  1223.     BZERO(nullmsg, TDFRAGHDR+TTMSGHDR);
  1224.     pvmput32(nullmsg, TIDPVMD);
  1225.     pvmput32(nullmsg + 4, pvmmyupid);
  1226.     pvmput32(nullmsg + 8, TTMSGHDR);
  1227.     pvmput32(nullmsg + 12, 0);            /* to keep putrify happy */
  1228.     pvmput8(nullmsg + 12, FFSOM|FFEOM);
  1229.     if (write(dsock, nullmsg, TDFRAGHDR+TTMSGHDR) != TDFRAGHDR+TTMSGHDR
  1230.     || read(dsock, pvminfo, SIZEHINFO*sizeof(int)) != SIZEHINFO*sizeof(int)) {
  1231.         pvmlogperror("pvmhost() write/read");
  1232.         MPI_Finalize();
  1233.         exit(8);
  1234.     }
  1235.     MPI_Bcast(pvminfo, SIZEHINFO, MPI_INT, pvmhostnode, MPI_COMM_WORLD);
  1236.     (void)relay(dsock);
  1237. }
  1238.  
  1239. #endif /*IMA_SP2MPI*/
  1240.     
  1241.     
  1242. /*    pvmbeatask()
  1243. *
  1244. *    Initialize libpvm, config process as a task.
  1245. *    This is called as the first step of each libpvm function so no
  1246. *    explicit initialization is required.
  1247. *
  1248. *    Returns 0 if okay, else error code.
  1249. */
  1250.  
  1251. int
  1252. pvmbeatask()
  1253. {
  1254.     int pvminfo[SIZEHINFO];        /* proto, hostpart, ptid, MTU, NDF */
  1255.     long rmid = -1;                /* msg ID returned by irecv() */
  1256.     int i;
  1257.     int cc;
  1258.     int ac = 0;
  1259.     char *p;
  1260.     struct pvmminfo minfo;
  1261.     int n;
  1262. #ifdef IMA_SP2MPI
  1263.     char *msgbuf;                /* buffer for Bsend */
  1264.     int msgbufsiz;
  1265. #endif
  1266.     int need_trcinfo = 0;
  1267.     int new_tracer = 0;
  1268.     char tmask[ 2 * TEV_MASK_LENGTH ];
  1269.     int tbuf, topt;
  1270.     int mid, rbf;
  1271.     TEVDECLS
  1272.  
  1273.     if (pvmmytid != -1)
  1274.         return 0;
  1275.  
  1276.     TEV_EXCLUSIVE;
  1277.  
  1278.     if (p = getenv("PVMTASKDEBUG")) {    /* read the debug mask */
  1279.         pvmdebmask = pvmxtoi(p);
  1280.         if (pvmdebmask) {
  1281.             sprintf(pvmtxt,"task debug mask is 0x%x\n", pvmdebmask);
  1282.             pvmlogerror(pvmtxt);
  1283.         }
  1284.     }
  1285.  
  1286. #if defined(IMA_PGON) || defined(IMA_I860)
  1287.     if ((rmid = _irecv(PMTHOST, pvminfo, sizeof(pvminfo))) < 0) {
  1288.         pvmlogperror("beatask() recv pvminfo");
  1289.         return PvmSysErr;
  1290.     }
  1291.     if ((pvmhostnode = _myhost()) < 0)
  1292.         pvmlogperror("beatask() no controlling process");
  1293. #endif
  1294.  
  1295.     if ((pvm_useruid = getuid()) == -1) {
  1296.         pvmlogerror("can't getuid()\n");
  1297.         return PvmSysErr;
  1298.     }
  1299.  
  1300. #ifdef IMA_CM5
  1301.     CMMD_enable_host();
  1302.     CMMD_receive_bc_from_host(pvminfo, sizeof(pvminfo));
  1303.     CMMD_reset_partition_size(pvminfo[0]);
  1304.     if ((pvmmynode = CMMD_self_address()) >= pvminfo[0])
  1305.         exit(0);        /* no task to spawn on this node  */
  1306.  
  1307.     CMMD_fset_io_mode(stdout, CMMD_independent);
  1308.     CMMD_fset_io_mode(stderr, CMMD_independent);
  1309.     pvmhostnode = CMMD_host_node();
  1310. #endif
  1311.  
  1312. #ifdef IMA_SP2MPI
  1313.     MPI_Init(&ac, NULL);
  1314.     MPI_Comm_rank(MPI_COMM_WORLD, &pvmmynode);
  1315.     MPI_Comm_size(MPI_COMM_WORLD, &pvmhostnode);
  1316.     pvmhostnode--;            /* host is last process in group */
  1317.     if (!(p = getenv("PVMBUFSIZE")) || !(msgbufsiz = strtol(p, (char**)0, 0)))
  1318.         msgbufsiz = MPIBUFSIZ;
  1319.     if (!(msgbuf = malloc(msgbufsiz)))
  1320.         pvmlogerror("relay() out of memory");
  1321.     MPI_Buffer_attach(msgbuf, msgbufsiz);            /* used in psend or relay */
  1322.     if (pvmmynode == pvmhostnode)
  1323.         (void)pvmhost();
  1324.     MPI_Bcast(pvminfo, SIZEHINFO, MPI_INT, pvmhostnode, MPI_COMM_WORLD);
  1325. #endif
  1326.  
  1327.     pvmmyupid = getpid();
  1328.  
  1329. /*
  1330. sprintf(pvmtxt, "pvminfo: %d  %d  %d  %d  %d  node=%d\n", pvminfo[0], pvminfo[1],  pvminfo[2], pvminfo[3], pvminfo[4], pvmmynode);
  1331. pvmlogerror(pvmtxt);
  1332. fflush(stdout);
  1333. */
  1334.  
  1335. #ifdef IMA_PGON
  1336.     if ((pvmmyptype = myptype()) == INVALID_PTYPE)
  1337.         pvmlogerror("beatask() no process type\n");
  1338.     pvmmynode = _mynode();
  1339. #endif
  1340. #ifdef IMA_I860
  1341.     pvmmynode = _mydirect();
  1342.     pvmmyvnode = _mynode();
  1343.     if (_csend(PMTPHYS, &pvmmynode, sizeof(int), pvmhostnode, PVMDPTYPE) < 0)
  1344.         pvmlogperror("beatask() can't send to host");
  1345. #endif
  1346.  
  1347.     /*
  1348.     *    initialize received-message list and fragment reassembly list
  1349.     */
  1350.  
  1351.     rxfrag = TALLOC(1, struct pmsg, "umb");
  1352.     BZERO((char*)rxfrag, sizeof(struct pmsg));
  1353.     rxfrag->m_link = rxfrag->m_rlink = rxfrag;
  1354.  
  1355.     pvmrxlist = TALLOC(1, struct pmsg, "umb");
  1356.     BZERO((char*)pvmrxlist, sizeof(struct pmsg));
  1357.     pvmrxlist->m_link = pvmrxlist->m_rlink = pvmrxlist;
  1358.  
  1359. #if defined(IMA_CM5) || defined(IMA_SP2MPI)
  1360.     precvlist = TALLOC(1, struct pmsg, "umb");
  1361.     BZERO((char*)precvlist, sizeof(struct pmsg));
  1362.     precvlist->m_link = precvlist->m_rlink = precvlist;
  1363. #endif
  1364.  
  1365.     for (i = 0; i < NUMSMHD; i++)
  1366.         outfrags[i].tf_mhdl = PVMMHDNULL;
  1367.  
  1368. #if defined(IMA_PGON) || defined(IMA_I860)
  1369.     _msgwait(rmid);
  1370. #endif
  1371.  
  1372. #if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_SP2MPI)
  1373.     if (pvminfo[0] != TDPROTOCOL) {
  1374.         sprintf(pvmtxt, "beatask() t-d protocol mismatch (%d/%d)\n",
  1375.             TDPROTOCOL, pvminfo[0]);
  1376.         pvmlogerror(pvmtxt);
  1377.         return PvmSysErr;
  1378.     }
  1379. #endif
  1380.  
  1381.     n = 1;
  1382.  
  1383.     mysetpart = pvminfo[n++];
  1384.     pvmmyptid = pvminfo[n++];
  1385.     pvmudpmtu = pvminfo[n++];
  1386.     pvmmydsig = pvminfo[n++];
  1387.  
  1388. #if defined(IMA_PGON)
  1389.     pvmpgonpartsize = pvminfo[n++];
  1390. #endif
  1391.  
  1392.     if (!pvmtrc.outtid) {
  1393.         pvmtrc.outtid = pvminfo[n++];
  1394.         pvmtrc.outctx = pvminfo[n++];
  1395.         pvmtrc.outtag = pvminfo[n++];
  1396.         pvmctrc.outtid = pvmtrc.outtid;
  1397.         pvmctrc.outctx = pvmtrc.outctx;
  1398.         pvmctrc.outtag = pvmtrc.outtag;
  1399.     }
  1400.     if (!pvmtrc.trctid) {
  1401.         pvmtrc.trctid = pvminfo[n++];
  1402.         pvmtrc.trcctx = pvminfo[n++];
  1403.         pvmtrc.trctag = pvminfo[n++];
  1404.         pvmctrc.trctid = pvmtrc.trctid;
  1405.         pvmctrc.trcctx = pvmtrc.trcctx;
  1406.         pvmctrc.trctag = pvmtrc.trctag;
  1407.         new_tracer++;
  1408.     }
  1409.  
  1410.     pvmmytid = mysetpart + pvmmynode;
  1411.     pvmfrgsiz = pvmudpmtu;
  1412.  
  1413.     if (p = getenv("PVMCTX"))
  1414.         pvmmyctx = pvmstrtoi(p);
  1415.  
  1416.     /* get trace mask from envar or zero it */
  1417.  
  1418.     if ( (p = getenv("PVMTMASK")) ) {
  1419.         if ( strlen(p) + 1 == TEV_MASK_LENGTH )
  1420.             BCOPY(p, pvmtrc.tmask, TEV_MASK_LENGTH);
  1421.         else
  1422.             TEV_MASK_INIT(pvmtrc.tmask);
  1423.     } else {
  1424.         TEV_MASK_INIT(pvmtrc.tmask);
  1425.         if ( new_tracer ) need_trcinfo++
  1426.     }
  1427.  
  1428.     BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
  1429.  
  1430.     /* get trace buffering from envar */
  1431.  
  1432.     if ((p = getenv("PVMTRCBUF")))
  1433.         pvmtrc.trcbuf = atoi( p );
  1434.     else {
  1435.         pvmtrc.trcbuf = 0;
  1436.         if ( new_tracer ) need_trcinfo++
  1437.     }
  1438.     
  1439.     pvmctrc.trcbuf = pvmtrc.trcbuf;
  1440.  
  1441.     /* get trace options from envar */
  1442.  
  1443.     if ((p = getenv("PVMTRCOPT")))
  1444.         pvmtrc.trcopt = atoi( p );
  1445.     else {
  1446.         pvmtrc.trcopt = 0;
  1447.         if ( new_tracer ) need_trcinfo++
  1448.     }
  1449.     
  1450.     pvmctrc.trcopt = pvmtrc.trcopt;
  1451.  
  1452.     BZERO(&minfo, sizeof(minfo));
  1453.     minfo.src = -1;
  1454.     minfo.ctx = SYSCTX_TC;
  1455.     minfo.tag = TC_CONREQ;
  1456.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conreq);
  1457.     minfo.tag = TC_CONACK;
  1458.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_conack);
  1459.     minfo.tag = TC_TASKEXIT;
  1460.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_taskexit);
  1461.     minfo.tag = TC_NOOP;
  1462.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_noop);
  1463.     minfo.tag = TC_SETTRACE;
  1464.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrace);
  1465.     minfo.tag = TC_SETTRCBUF;
  1466.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcbuf);
  1467.     minfo.tag = TC_SETTRCOPT;
  1468.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settrcopt);
  1469.     minfo.tag = TC_SETTMASK;
  1470.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_settmask);
  1471.     minfo.tag = TC_SIBLINGS;
  1472.     pvm_addmhf(minfo.src, minfo.tag, minfo.ctx, pvm_tc_siblings);
  1473.  
  1474.     if ( need_trcinfo )
  1475.     {
  1476.         rbf = pvm_setrbuf( 0 );
  1477.  
  1478.         if ( pvm_recvinfo( PVMTRACERCLASS, 0, PvmMboxDefault ) > 0 )
  1479.         {
  1480.             pvm_upkint(&trctid, 1, 1);
  1481.  
  1482.             pvm_upkint(&trcctx, 1, 1);
  1483.             pvm_upkint(&trctag, 1, 1);
  1484.  
  1485.             pvm_upkint(&outctx, 1, 1);  /* unused here */
  1486.             pvm_upkint(&outtag, 1, 1);  /* unused here */
  1487.  
  1488.             pvm_upkstr(tmask);
  1489.  
  1490.             pvm_upkint(&tbuf, 1, 1);
  1491.             pvm_upkint(&topt, 1, 1);
  1492.  
  1493.             if ( pvmtrc.trctid == trctid && pvmtrc.trcctx == trcctx
  1494.                     && pvmtrc.trctag == trctag )
  1495.             {
  1496.                 if ( strlen(tmask) + 1 == TEV_MASK_LENGTH ) {
  1497.                     BCOPY(tmask, pvmtrc.tmask, TEV_MASK_LENGTH);
  1498.                     BCOPY(pvmtrc.tmask, pvmctrc.tmask, TEV_MASK_LENGTH);
  1499.                 }
  1500.  
  1501.                 pvmtrc.trcbuf = tbuf;
  1502.                 pvmctrc.trcbuf = pvmtrc.trcbuf;
  1503.  
  1504.                 pvmtrc.trcopt = topt;
  1505.                 pvmctrc.trcopt = pvmtrc.trcopt;
  1506.             }
  1507.  
  1508.             pvm_freebuf(pvm_setrbuf(rbf));
  1509.         }
  1510.  
  1511.         else
  1512.             pvm_setrbuf(rbf);
  1513.     }
  1514.  
  1515.     tev_init();
  1516.  
  1517.     if (TEV_AMEXCL) {
  1518.         TEV_ENDEXCL;
  1519.     }
  1520.  
  1521.     return 0;
  1522. }
  1523.  
  1524.  
  1525. int
  1526. pvmendtask()
  1527. {
  1528. #if defined(IMA_PGON) || defined(IMA_I860) || defined(IMA_SP2MPI)
  1529.     int i;
  1530. #ifdef IMA_SP2MPI
  1531.     MPI_Status mpista;
  1532. #endif
  1533.  
  1534.     for (i = 0; i < NUMSMHD; i++)
  1535.         if (outfrags[i].tf_mhdl != PVMMHDNULL)
  1536.             ASYNCWAIT(outfrags[i].tf_mhdl);
  1537. #endif
  1538.  
  1539.     if (pvmmytid != -1) {
  1540.         pvmmytid = -1;
  1541.     }
  1542.  
  1543. #ifdef IMA_CM5
  1544.     CMMD_all_msgs_wait();
  1545. #endif
  1546. #ifdef IMA_SP2MPI
  1547.     MPI_Finalize();
  1548. #endif
  1549.  
  1550.     /* XXX free rxfrag and rxlist */
  1551.  
  1552.     return 0;
  1553. }
  1554.  
  1555.  
  1556. void
  1557. check_for_exit(src)
  1558.     int src;
  1559. {
  1560. }
  1561.  
  1562.  
  1563. /************************
  1564.  **  Libpvm Functions  **
  1565.  **                    **
  1566.  ************************/
  1567.  
  1568.  
  1569. int
  1570. pvm_getfds(fds)        /* XXX this function kinda sucks */
  1571.     int **fds;            /* fd list return */
  1572. {
  1573.     int cc;
  1574.  
  1575.     cc = PvmNotImpl;
  1576.     return (cc < 0 ? lpvmerr("pvm_getfds", cc) : cc);
  1577. }
  1578.  
  1579.  
  1580. /*
  1581.  * receive a message for precv
  1582.  * returns 1 if message is received, 0 if not.
  1583.  * returns -1 on error
  1584.  */
  1585. int
  1586. mpprecv(tid, tag, cp, len, rtid, rtag, rlen)
  1587.     int tid;
  1588.     int tag;
  1589.     void *cp;
  1590.     int len;
  1591.     int *rtid;
  1592.     int *rtag;
  1593.     int *rlen;
  1594. {
  1595. #if defined(IMA_PGON) || defined(IMA_CM5)
  1596.     long info[8];
  1597. #endif
  1598. #ifdef IMA_SP2MPI
  1599.     MPI_Status info;
  1600.     int mpiflag = 0;
  1601.     int mpisiz;
  1602. #endif
  1603.     long node;
  1604.     struct pmsg *up;
  1605.     int cc;
  1606.  
  1607.     node = (tid == -1) ? MPPANYNODE : tid & pvmtidnmask;
  1608. #if defined(IMA_CM5) || defined(IMA_SP2MPI)
  1609.     for (up = precvlist->m_link; up != precvlist; up = up->m_link)
  1610.         /* message picked up by psend */
  1611.         if ((tag == -1 || tag == up->m_tag) 
  1612.         && (tid == -1 || node == up->m_src)) {
  1613.             struct frag *fp;
  1614.  
  1615.             fp = up->m_frag->fr_rlink;
  1616.             if (fp->fr_max < len)
  1617.                 len = fp->fr_max;
  1618.             BCOPY(fp->fr_dat, cp, len);
  1619.             if (rlen)
  1620.                 *rlen = len;
  1621.             if (rtid)
  1622.                 *rtid = up->m_src + (pvmmytid & ~pvmtidnmask);
  1623.             if (rtag)
  1624.                 *rtag = up->m_tag;
  1625.             LISTDELETE(up, m_link, m_rlink);
  1626.             if (pvmdebmask & PDMPACKET) {
  1627.                 sprintf(pvmtxt, "pvm_precv() task %x len %d tag %d\n",
  1628.                     up->m_src + (pvmmytid & ~pvmtidnmask), 
  1629.                     rlen ? *rlen : fp->fr_max, rtag ? *rtag : up->m_tag);
  1630.                     pvmlogerror(pvmtxt);
  1631.             }
  1632.             umbuf_free(up);
  1633.             return 1;
  1634.         }
  1635. #endif
  1636.     
  1637.     /* 
  1638.      * Msg routed from pvmd has type PMTPACK, so it may have already
  1639.      * been picked up by the last pvm_recv. If so, we call pvm_recv
  1640.      * to process it. Note we only need consider the case tid = -1,
  1641.      * because msg psent by another node cannot have type PMTPACK.
  1642.      */
  1643.     if (tid == -1) {
  1644.         for (up = pvmrxlist->m_link; up != pvmrxlist; up = up->m_link)
  1645.             if (tag == -1 || tag == up->m_tag)
  1646.                 return 0;            /* go on to pvm_recv */
  1647.         if (tag != -1)
  1648.             /* wait for msg with tag or PMTPACK */
  1649.             while (!MSGPROBE(MPPANYNODE, tag, info))
  1650.                 if (MSGPROBE(pvmhostnode, PMTPACK, info)) {
  1651.                     if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
  1652.                         return cc;
  1653.                     for (up = pvmrxlist->m_link; up != pvmrxlist; 
  1654.                     up = up->m_link)
  1655.                         if (tag == up->m_tag)
  1656.                             return 0;            /* go on to pvm_recv */
  1657.                 }
  1658.     }
  1659.  
  1660.     /* 1) from another node: 1a) tid = -1; 1b) tid given; 2) tag = -1 */
  1661. #ifndef IMA_PGON
  1662.     if (tag == -1)
  1663.         tag = MPPANYTAG;
  1664. #endif
  1665.     if ((mpierrcode = 
  1666.     PVMCRECV(node, tag, (char*)cp, len, pvmmyptype, info)) 
  1667. #if defined(IMA_PGON) || defined(IMA_CM5)
  1668.     && (mpierrcode < 0)
  1669. #endif
  1670.     ) {
  1671.         pvmlogperror("precv() PVMCRECV");
  1672.         return PvmSysErr;
  1673.     } 
  1674.     if ((node = MSGSENDER(info)) == pvmhostnode)
  1675.         /* got null msg; go on to pvm_recv */
  1676.         return 0;
  1677.             
  1678.     if (rlen)
  1679.         *rlen = MSGSIZE(info);
  1680.     if (rtid)
  1681.         *rtid = node + (pvmmytid & ~pvmtidnmask);
  1682.     if (rtag)
  1683.         *rtag = MSGTAG(info);
  1684.     if (pvmdebmask & PDMPACKET) {
  1685.         sprintf(pvmtxt, "pvm_precv() task %x len %d tag %d\n",
  1686.             node + (pvmmytid & ~pvmtidnmask), rlen ? *rlen : MSGSIZE(info), 
  1687.             rtag ? *rtag : MSGTAG(info));
  1688.         pvmlogerror(pvmtxt);
  1689.     }
  1690.     return 1;
  1691. }
  1692.  
  1693.  
  1694. int
  1695. pvm_precv(tid, tag, cp, len, dt, rtid, rtag, rlen)
  1696.     int tid;
  1697.     int tag;
  1698.     void *cp;
  1699.     int len;
  1700.     int dt;
  1701.     int *rtid;
  1702.     int *rtag;
  1703.     int *rlen;
  1704. {
  1705.     int nb, mc, src;
  1706.     int rbf;
  1707.     int cc = 0;
  1708.     long ad;
  1709.     TEV_DECLS
  1710.  
  1711.     if (TEV_EXCLUSIVE) {
  1712.         if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_ENTRY)) {
  1713.             TEV_PACK_INT( TEV_DID_RST, TEV_DATA_SCALAR, &tid, 1, 1 );
  1714.             TEV_PACK_INT( TEV_DID_RMC, TEV_DATA_SCALAR, &tag, 1, 1 );
  1715.             ad = (long)cp;
  1716.             TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
  1717.             TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
  1718.             TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
  1719.             TEV_FIN;
  1720.         }
  1721.     }
  1722.  
  1723.     switch (dt) {
  1724.  
  1725.     case PVM_BYTE:
  1726.         len *= sizeof(char);
  1727.         break;
  1728.  
  1729.     case PVM_SHORT:
  1730.     case PVM_USHORT:
  1731.         len *= sizeof(short);
  1732.         break;
  1733.  
  1734.     case PVM_INT:
  1735.     case PVM_UINT:
  1736.         len *= sizeof(int);
  1737.         break;
  1738.  
  1739.     case PVM_LONG:
  1740.     case PVM_ULONG:
  1741.         len *= sizeof(long);
  1742.         break;
  1743.  
  1744.     case PVM_FLOAT:
  1745.         len *= sizeof(float);
  1746.         break;
  1747.  
  1748.     case PVM_CPLX:
  1749.         len *= sizeof(float) * 2;
  1750.         break;
  1751.  
  1752.     case PVM_DOUBLE:
  1753.         len *= sizeof(double);
  1754.         break;
  1755.  
  1756.     case PVM_DCPLX:
  1757.         len *= sizeof(double) * 2;
  1758.         break;
  1759.  
  1760.     case PVM_STR:
  1761.         cc = PvmNotImpl;
  1762.         break;
  1763.  
  1764.     default:
  1765.         cc = PvmBadParam;
  1766.         break;
  1767.     }
  1768.  
  1769.     if (!cc) {
  1770. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)
  1771.         int mask = pvmtidhmask | pvmtidtmask;        /* same partition */
  1772.  
  1773.         if (tid == -1 || (TIDISNODE(tid) && (tid & mask) == (pvmmytid & mask)))
  1774.             /*
  1775.              * 1) Don't know where msg is coming from.
  1776.              * 2) Expect msg from a node in the same partition.
  1777.              */
  1778.             if (cc = mpprecv(tid, tag, cp, len, &src, &mc, &nb)) {
  1779.                 /* 1) Whole msg recv'd. 2) Error */
  1780.                 if (rtid)
  1781.                     *rtid = src;
  1782.                 if (rtag)
  1783.                     *rtag = mc;
  1784.                 if (rlen)
  1785.                     *rlen = nb;
  1786.                 goto done;
  1787.             }
  1788. #endif 
  1789.         rbf = pvm_setrbuf(0);
  1790.         cc = pvm_recv(tid, tag);
  1791.         if (cc > 0) {
  1792.             pvm_bufinfo(cc, &nb, &mc, &src);
  1793.             if (rlen)
  1794.                 *rlen = nb;
  1795.             if (nb < len)
  1796.                 len = nb;
  1797.             if (rtag)
  1798.                 *rtag = mc;
  1799.             if (rtid)
  1800.                 *rtid = src;
  1801.             pvm_upkbyte((char *)cp, len, 1);
  1802.             pvm_freebuf(cc);
  1803.             cc = 0;
  1804.         }
  1805.         pvm_setrbuf(rbf);
  1806.     }
  1807.  
  1808. done:
  1809.  
  1810.     if (TEV_AMEXCL) {
  1811.         if (TEV_DO_TRACE(TEV_PRECV,TEV_EVENT_EXIT)) {
  1812.             TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
  1813.             if ( cc < 0 )
  1814.                 nb = mc = src = -1;
  1815.             TEV_PACK_INT( TEV_DID_MNB, TEV_DATA_SCALAR, &nb, 1, 1 );
  1816.             TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &mc, 1, 1 );
  1817.             TEV_PACK_INT( TEV_DID_SRC, TEV_DATA_SCALAR, &src, 1, 1 );
  1818.             TEV_FIN;
  1819.         }
  1820.         TEV_ENDEXCL;
  1821.     }
  1822.  
  1823.     if (cc < 0)
  1824.         lpvmerr("pvm_precv", cc);
  1825.     return cc;
  1826. }
  1827.  
  1828.  
  1829. int
  1830. pvm_psend(tid, tag, cp, len, dt)
  1831.     int tid;
  1832.     int tag;
  1833.     void *cp;
  1834.     int len;
  1835.     int dt;
  1836. {
  1837.     int sbf;
  1838.     int cc = 0;
  1839.     long ad;
  1840.     TEV_DECLS
  1841.  
  1842.     if (TEV_EXCLUSIVE) {
  1843.         if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_ENTRY)) {
  1844.             TEV_PACK_INT( TEV_DID_DST, TEV_DATA_SCALAR, &tid, 1, 1 );
  1845.             TEV_PACK_INT( TEV_DID_MC, TEV_DATA_SCALAR, &tag, 1, 1 );
  1846.             ad = (long)cp;
  1847.             TEV_PACK_LONG( TEV_DID_PDA, TEV_DATA_SCALAR, &ad, 1, 1 );
  1848.             TEV_PACK_INT( TEV_DID_PC, TEV_DATA_SCALAR, &len, 1, 1 );
  1849.             TEV_PACK_INT( TEV_DID_PDT, TEV_DATA_SCALAR, &dt, 1, 1 );
  1850.             TEV_FIN;
  1851.         }
  1852.     }
  1853.  
  1854.     switch (dt) {
  1855.  
  1856.     case PVM_BYTE:
  1857.         len *= sizeof(char);
  1858.         break;
  1859.  
  1860.     case PVM_SHORT:
  1861.     case PVM_USHORT:
  1862.         len *= sizeof(short);
  1863.         break;
  1864.  
  1865.     case PVM_INT:
  1866.     case PVM_UINT:
  1867.         len *= sizeof(int);
  1868.         break;
  1869.  
  1870.     case PVM_LONG:
  1871.     case PVM_ULONG:
  1872.         len *= sizeof(long);
  1873.         break;
  1874.  
  1875.     case PVM_FLOAT:
  1876.         len *= sizeof(float);
  1877.         break;
  1878.  
  1879.     case PVM_CPLX:
  1880.         len *= sizeof(float) * 2;
  1881.         break;
  1882.  
  1883.     case PVM_DOUBLE:
  1884.         len *= sizeof(double);
  1885.         break;
  1886.  
  1887.     case PVM_DCPLX:
  1888.         len *= sizeof(double) * 2;
  1889.         break;
  1890.  
  1891.     case PVM_STR:
  1892.         cc = PvmNotImpl;
  1893.         break;
  1894.  
  1895.     default:
  1896.         cc = PvmBadParam;
  1897.         break;
  1898.     }
  1899.  
  1900.     if (!cc) {
  1901.  
  1902. #if defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)
  1903.  
  1904.         int mask = pvmtidhmask;             /* host */
  1905.         long node = tid & pvmtidnmask;
  1906. #ifdef IMA_CM5
  1907.         CMMD_mcb mhdl;
  1908.         int info;
  1909. #endif
  1910. #ifdef IMA_SP2MPI
  1911.         MPI_Request mhdl;
  1912.         MPI_Status info, mpista;
  1913.         int mpiflag = 0;
  1914.         int mpisiz;
  1915. #endif
  1916.  
  1917.         mask |= pvmtidtmask;                /* process type */
  1918.         if (TIDISNODE(tid) && (tid & mask) == (pvmmytid & mask)) {
  1919.             if (pvmdebmask & PDMPACKET) {
  1920.                 sprintf(pvmtxt, 
  1921.                     "pvm_psend() dst t%x len %d ptype=%ld node=%ld\n",
  1922.                     tid, len, pvmmyptype, tid & pvmtidnmask);
  1923.                 pvmlogerror(pvmtxt);
  1924.             }
  1925. #ifdef IMA_PGON
  1926.             if (PVMCSEND(tag, cp, len, node, pvmmyptype) < 0)
  1927. #endif
  1928. #ifdef IMA_CM5
  1929.             if ((mhdl = ASYNCSEND(tag, cp, len, node, pvmmyptype)) < 0)
  1930. #endif
  1931. #ifdef IMA_SP2MPI
  1932.             if (mpierrcode = MPI_Isend(cp, len, MPI_BYTE, node,
  1933.                 tag, MPI_COMM_WORLD, &mhdl))
  1934. #endif
  1935.             {
  1936.                 pvmlogperror("psend() PVMCSEND");
  1937.                 cc = PvmSysErr;
  1938.                 goto done;
  1939.             }
  1940. #if defined(IMA_CM5) || defined(IMA_SP2MPI)
  1941.             while (!ASYNCDONE(mhdl)) {
  1942.                 int rtag;
  1943.  
  1944.                 /* 
  1945.                  * Psend is really a buffered send. We use async send to
  1946.                  * avoid the overhead of buffering (50%). The behavior of
  1947.                  * psend is now similar to RouteDirect: the sender blocks
  1948.                  * until the other party signals its intention to receive,
  1949.                  * but it can accept incoming messages in the meantime.
  1950.                  * This avoids deadlock when two tasks send messages to
  1951.                  * each other at the same time.
  1952.                  */
  1953.                 if (MSGPROBE(MPPANYNODE, MPPANYTAG, info)) {
  1954.                     if ((rtag = MSGTAG(info)) != PMTPACK) {
  1955.                         struct pmsg *rxup;
  1956.                         struct frag *fp;
  1957.  
  1958.                         rxup = midtobuf(umbuf_new());
  1959.                         rxup->m_src = MSGSENDER(info);
  1960.                         rxup->m_tag = rtag;
  1961.                         fp = fr_new(MSGSIZE(info));
  1962.                         LISTPUTBEFORE(rxup->m_frag, fp, fr_link, fr_rlink);
  1963.                         if ((mpierrcode = PVMCRECV(rxup->m_src, rtag, 
  1964.                         fp->fr_dat, fp->fr_max, pvmmyptype, info)) 
  1965. #ifdef IMA_CM5
  1966.                         && (mpierrcode < 0)
  1967. #endif
  1968.                         ) {
  1969.                             pvmlogperror("psend() PVMCRECV");
  1970.                             cc = PvmSysErr;
  1971.                             goto done;
  1972.                         } 
  1973.                         LISTPUTBEFORE(precvlist, rxup, m_link, m_rlink);
  1974.                     } else {
  1975.                         if ((cc = mroute(0, 0, 0, (struct timeval *)0)) < 0)
  1976.                             goto done;
  1977.                     }
  1978.                 }
  1979.             }
  1980. #ifdef IMA_CM5
  1981.             CMMD_free_mcb(mhdl);
  1982. #endif
  1983. #endif /*defined(IMA_CM5) || defined(IMA_SP2MPI)*/
  1984.         } else
  1985.  
  1986. #endif /*defined(IMA_PGON) || defined(IMA_CM5) || defined(IMA_SP2MPI)*/
  1987.  
  1988.         {
  1989.             sbf = pvm_setsbuf(pvm_mkbuf(PvmDataInPlace));
  1990.             pvm_pkbyte((char *)cp, len, 1);
  1991.             if ((cc = pvm_send(tid, tag)) > 0)
  1992.                 cc = 0;
  1993.             pvm_freebuf(pvm_setsbuf(sbf));
  1994.         }
  1995.     }
  1996.  
  1997. done:
  1998.  
  1999.     if (TEV_AMEXCL) {
  2000.         if (TEV_DO_TRACE(TEV_PSEND,TEV_EVENT_EXIT)) {
  2001.             TEV_PACK_INT( TEV_DID_CC, TEV_DATA_SCALAR, &cc, 1, 1 );
  2002.             TEV_FIN;
  2003.         }
  2004.         TEV_ENDEXCL;
  2005.     }
  2006.  
  2007.     if (cc < 0)
  2008.         lpvmerr("pvm_psend", cc);
  2009.     return cc;
  2010. }
  2011.  
  2012.  
  2013.